热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

竞选|和会_Flink1.15源码解析启动JobManagerWebMonitorEndpoint启动

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动相关的知识,希望对你有一定的参考价值。

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动相关的知识,希望对你有一定的参考价值。



文章目录


  • 一、前言
  • 二、WebMonitorEndpoint 构建
    • 2.1、restEndpointFactory 的初始化
    • 2.2、createRestEndpoint 创建 WebMonitorEndpoint

  • 三、WebMonitorEndpoint 启动
    • 3.1、Router
    • 3.2、注册了一堆Handler
    • 3.3、Netty启动的相关操作
      • 3.3.1、 ChannelInitializer 初始化
      • 3.3.2、NioEventLoopGroup 初始化
      • 3.3.3、绑定 rest endpoint
      • 3.3.4、restAddress 启动成功
      • 3.3.5、修改 EndPoint 状态为 RUNNING, 到这里 WebMonitorEndpoint 的 Netty 服务就启动完毕了

    • 3.4、钩子来启动子类特定的服务。
      • 3.4.1、 节点选举
        • 3.4.1.1、以 standalone 模式启动为例 创建的 StandaloneLeaderElectionService 对象
        • 3.4.1.2、以 zookeeper HA 模式启动为例 创建的 DefaultLeaderElectionService对象



  • 四、总结
  • 返回[Flink1.15源码解析-总目录](https://blog.csdn.net/wuxintdrh/article/details/127796678)


一、前言

从上文 Flink1.15源码解析---- DispatcherResourceManagerComponent 我们知道WebMonitorEndpoint的创建及启动

org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create

// 构建了一个线程池用于执行 WebMonitorEndpointEndpoint 所接收到的client发送过来的请求
final ScheduledExecutorService executor =
WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint");
// 初始化 MetricFetcher, 默认刷新间隔是10s
final long updateInterval =
configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
final MetricFetcher metricFetcher =
updateInterval == 0
? VoidMetricFetcher.INSTANCE
: MetricFetcherImpl.fromConfiguration(
configuration,
metricQueryServiceRetriever,
dispatcherGatewayRetriever,
executor);
// 创建 三大组件之 WebMonitorEndpoint
webMonitorEndpoint =
restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
fatalErrorHandler);
// 启动 三大组件之 WebMonitorEndpoint
log.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();

本文我们将详细的梳理 WebMonitorEndpoint 的构建与启动


二、WebMonitorEndpoint 构建

WebMonitorEndpoint 由 restEndpointFactory 构建, restEndpointFactory 的初始化 由 DispatcherResourceManagerComponentFactory 根据启动方式不同

接下来我们以 StandaloneSessionClusterEntrypoint 为例 看 restEndpointFactory 的初始化


2.1、restEndpointFactory 的初始化

1、StandaloneSessionClusterEntrypoint 创建 DefaultDispatcherResourceManagerComponentFactory
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint#createDispatcherResourceManagerComponentFactory

@Override
protected DefaultDispatcherResourceManagerComponentFactory
createDispatcherResourceManagerComponentFactory(Configuration configuration)
return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(
StandaloneResourceManagerFactory.getInstance());

2、createSessionComponentFactory 包含三大组件工厂的创建
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#createSessionComponentFactory

public static DefaultDispatcherResourceManagerComponentFactory createSessionComponentFactory(
ResourceManagerFactory<?> resourceManagerFactory)
return new DefaultDispatcherResourceManagerComponentFactory(
DefaultDispatcherRunnerFactory.createSessionRunner(
SessionDispatcherFactory.INSTANCE),
resourceManagerFactory,
SessionRestEndpointFactory.INSTANCE);

restEndpointFactory 是 SessionRestEndpointFactory.INSTANCE


2.2、createRestEndpoint 创建 WebMonitorEndpoint

RestEndpointFactory 创建 DispatcherRestEndpoint

/** &#64;link RestEndpointFactory which creates a &#64;link DispatcherRestEndpoint. */
public enum SessionRestEndpointFactory implements RestEndpointFactory<DispatcherGateway>
INSTANCE;
&#64;Override
public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint(
Configuration configuration,
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
TransientBlobService transientBlobService,
ScheduledExecutorService executor,
MetricFetcher metricFetcher,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler)
throws Exception
final RestHandlerConfiguration restHandlerConfiguration &#61;
RestHandlerConfiguration.fromConfiguration(configuration);
// 创建 DispatcherRestEndpoint
return new DispatcherRestEndpoint(
dispatcherGatewayRetriever,
configuration,
restHandlerConfiguration,
resourceManagerGatewayRetriever,
transientBlobService,
executor,
metricFetcher,
leaderElectionService,
RestEndpointFactory.createExecutionGraphCache(restHandlerConfiguration),
fatalErrorHandler);


创建的 DispatcherRestEndpoint 是 Dispatcher 的 REST endpoint

/** REST endpoint for the &#64;link Dispatcher component. */
public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway>
//......


三、WebMonitorEndpoint 启动

实际调用 org.apache.flink.runtime.rest.RestServerEndpoint#start


3.1、Router

// 1、首先创建Router&#xff0c;来解析Client的请求并寻找对应的Handler
final Router router &#61; new Router();

3.2、注册了一堆Handler

// 2、 注册了一堆Handler
// 2.1、初始化 handlers
final CompletableFuture<String> restAddressFuture &#61; new CompletableFuture<>();
handlers &#61; initializeHandlers(restAddressFuture);
// 2.2、将这些Handler进行排序&#xff0c;这里的排序是为了确认URL和Handler一对一的关系
/* sort the handlers such that they are ordered the following:
* /jobs
* /jobs/overview
* /jobs/:jobid
* /jobs/:jobid/config
* /:*
*/

Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);
// 2.3、 排序好后通过checkAllEndpointsAndHandlersAreUnique方法来确认唯一性
checkAllEndpointsAndHandlersAreUnique(handlers);
// 2.4、 注册 handlers
handlers.forEach(handler -> registerHandler(router, handler, log));

3.3、Netty启动的相关操作


3.3.1、 ChannelInitializer 初始化

// 3.1、 ChannelInitializer 初始化
ChannelInitializer<SocketChannel> initializer &#61;
new ChannelInitializer<SocketChannel>()
&#64;Override
protected void initChannel(SocketChannel ch) throws ConfigurationException
RouterHandler handler &#61; new RouterHandler(router, responseHeaders);
// SSL should be the first handler in the pipeline
if (isHttpsEnabled())
ch.pipeline()
.addLast(
"ssl",
new RedirectingSslHandler(
restAddress,
restAddressFuture,
sslHandlerFactory));

ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new FileUploadHandler(uploadDir))
.addLast(
new FlinkHttpObjectAggregator(
maxContentLength, responseHeaders));
for (InboundChannelHandlerFactory factory :
inboundChannelHandlerFactories)
Optional<ChannelHandler> channelHandler &#61;
factory.createHandler(configuration, responseHeaders);
if (channelHandler.isPresent())
ch.pipeline().addLast(channelHandler.get());


ch.pipeline()
.addLast(new ChunkedWriteHandler())
.addLast(handler.getName(), handler)
.addLast(new PipelineErrorHandler(log, responseHeaders));

;

3.3.2、NioEventLoopGroup 初始化

NioEventLoopGroup bossGroup &#61;
new NioEventLoopGroup(
1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
NioEventLoopGroup workerGroup &#61;
new NioEventLoopGroup(
0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));
bootstrap &#61; new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioserverSocketChannel.class)
.childHandler(initializer);

3.3.3、绑定 rest endpoint

// 3.3、 Binding rest endpoint
// 3.3.1、获取可用端口范围
Iterator<Integer> portsIterator;
try
portsIterator &#61; NetUtils.getPortRangeFromString(restBindPortRange);
catch (IllegalConfigurationException e)
throw e;
catch (Exception e)
throw new IllegalArgumentException(
"Invalid port range definition: " &#43; restBindPortRange);

// 3.3.2、处理端口冲突 将逐一尝试端口是否可用
int chosenPort &#61; 0;
while (portsIterator.hasNext())
try
chosenPort &#61; portsIterator.next();
final ChannelFuture channel;
// 绑定address,port 获取 channel
if (restBindAddress &#61;&#61; null)
channel &#61; bootstrap.bind(chosenPort);
else
channel &#61; bootstrap.bind(restBindAddress, chosenPort);

serverChannel &#61; channel.syncUninterruptibly().channel();
break;
catch (final Exception e)
// syncUninterruptibly() throws checked exceptions via Unsafe
// continue if the exception is due to the port being in use, fail early
// otherwise
if (!(e instanceof java.net.BindException))
throw e;



if (serverChannel &#61;&#61; null)
throw new BindException(
"Could not start rest endpoint on any port in port range "
&#43; restBindPortRange);

log.debug("Binding rest endpoint to :.", restBindAddress, chosenPort);
final InetSocketAddress bindAddress &#61; (InetSocketAddress) serverChannel.localAddress();
final String advertisedAddress;
if (bindAddress.getAddress().isAnyLocalAddress())
advertisedAddress &#61; this.restAddress;
else
advertisedAddress &#61; bindAddress.getAddress().getHostAddress();

port &#61; bindAddress.getPort();
log.info("Rest endpoint listening at :", advertisedAddress, port);
restBaseUrl &#61; new URL(determineProtocol(), advertisedAddress, port, "").toString();

3.3.4、restAddress 启动成功

restAddressFuture.complete(restBaseUrl);

3.3.5、修改 EndPoint 状态为 RUNNING, 到这里 WebMonitorEndpoint 的 Netty 服务就启动完毕了

state &#61; State.RUNNING;

3.4、钩子来启动子类特定的服务。


/**
* Hook to start sub class specific services.
*
* &#64;throws Exception if an error occurred
*/

protected abstract void startInternal() throws Exception;

我们看下子类 startInternal 的 具体实现
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#startInternal

&#64;Override
public void startInternal() throws Exception
// 1、 节点选举
leaderElectionService.start(this);
startExecutionGraphCacheCleanupTask();
if (hasWebUI)
log.info("Web frontend listening at .", getRestBaseUrl());



3.4.1、 节点选举

HighAvailabilityServices 初始化, 根据 high-availability 的类型创建不同的 HighAvailabilityServices

leaderElectionService 初始化是在 WebMonitorEndpoint 创建时构建的。

highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),

3.4.1.1、以 standalone 模式启动为例 创建的 StandaloneLeaderElectionService 对象

&#64;Override
public LeaderElectionService getClusterRestEndpointLeaderElectionService()
synchronized (lock)
checkNotShutdown();
return new StandaloneLeaderElectionService();


节点选举&#xff0c; 直接将 contender 设置为领导者&#xff0c; 此处的 contender 就是 WebMonitorEndpoint

&#64;Override
public void start(LeaderContender newContender) throws Exception
if (contender !&#61; null)
// Service was already started
throw new IllegalArgumentException(
"Leader election service cannot be started multiple times.");

contender &#61; Preconditions.checkNotNull(newContender);
// directly grant leadership to the given contender
contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);


3.4.1.2、以 zookeeper HA 模式启动为例 创建的 DefaultLeaderElectionService对象

org.apache.flink.runtime.highavailability.AbstractHaServices#getClusterRestEndpointLeaderElectionService

&#64;Override
public LeaderElectionService getClusterRestEndpointLeaderElectionService()
// 由子类实现 创建 选举leader服务
return createLeaderElectionService(getLeaderPathForRestServer());

子类实现

//org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices#createLeaderElectionService
&#64;Override
protected LeaderElectionService createLeaderElectionService(String leaderPath)
return ZooKeeperUtils.createLeaderElectionService(getCuratorFramework(), leaderPath);

// 创建 DefaultLeaderElectionService
// org.apache.flink.runtime.util.ZooKeeperUtils#createLeaderElectionService
public static DefaultLeaderElectionService createLeaderElectionService(
final CuratorFramework client, final String path)
return new DefaultLeaderElectionService(createLeaderElectionDriverFactory(client, path));

DefaultLeaderElectionService 启动节点选举&#xff0c; 此处传入的 contender 就是 WebMonitorEndpoint

Flink的选举使用的是Curator框架&#xff0c;节点的选举针对每一个参选对象&#xff0c;会创建一个选举驱动leaderElectionDriver&#xff0c;在完成选举之后&#xff0c;会回调两个方法&#xff0c;如果选举成功会回调isLeader方法&#xff0c;如果竞选失败则回调notLeader方法。

// org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#start
&#64;Override
public final void start(LeaderContender contender) throws Exception
checkNotNull(contender, "Contender must not be null.");
Preconditions.checkState(leaderContender &#61;&#61; null, "Contender was already set.");
synchronized (lock)
running &#61; true;
/*
在WebMonitorEndpoint中调用时&#xff0c;此contender为DispatcherRestEndPoint
在ResourceManager中调用时,contender为ResourceManager
在DispatcherRunner中调用时,contender为DispatcherRunner
*/

leaderContender &#61; contender;

// 针对每一个参选对象&#xff0c;会创建一个选举驱动leaderElectionDriver
leaderElectionDriver &#61;
leaderElectionDriverFactory.createLeaderElectionDriver(
this,
new LeaderElectionFatalErrorHandler(),
leaderContender.getDescription());
LOG.info("Starting DefaultLeaderElectionService with .", leaderElectionDriver);


Flink的选举使用的是Curator框架&#xff0c;节点的选举针对每一个参选对象&#xff0c;会创建一个选举驱动leaderElectionDriver&#xff0c;在完成选举之后&#xff0c;会回调两个方法&#xff0c;如果选举成功会回调isLeader方法&#xff0c;如果竞选失败则回调notLeader方法。

ZooKeeperLeaderElectionDriverFactory 创建 ZooKeeperLeaderElectionDriver&#xff0c; LeaderElectionDriver负责执行领导选举和存储
领导信息。

// org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriverFactory#createLeaderElectionDriver
&#64;Override
public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
LeaderElectionEventHandler leaderEventHandler, // DefaultLeaderElectionService对象
FatalErrorHandler fatalErrorHandler, // new LeaderElectionFatalErrorHandler()
String leaderContenderDescription)
throws Exception
return new ZooKeeperLeaderElectionDriver(
client, path, leaderEventHandler, fatalErrorHandler, leaderContenderDescription);


public ZooKeeperLeaderElectionDriver(
CuratorFramework client,
String path,
LeaderElectionEventHandler leaderElectionEventHandler, // 传入的是 DefaultLeaderElectionService
FatalErrorHandler fatalErrorHandler,
String leaderContenderDescription)
throws Exception
checkNotNull(path);
this.client &#61; checkNotNull(client);
this.connectionInformationPath &#61; ZooKeeperUtils.generateConnectionInformationPath(path);
this.leaderElectionEventHandler &#61; checkNotNull(leaderElectionEventHandler);
this.fatalErrorHandler &#61; checkNotNull(fatalErrorHandler);
this.leaderContenderDescription &#61; checkNotNull(leaderContenderDescription);
leaderLatchPath &#61; ZooKeeperUtils.generateLeaderLatchPath(path);
leaderLatch &#61; new LeaderLatch(client, leaderLatchPath);
this.cache &#61;
ZooKeeperUtils.createTreeCache(
client

推荐阅读
author-avatar
晓亮居士_264
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有